Big Data Finance Pipeline
Real-time cryptocurrency and FX data processing
Adam Kaniasty, Igor Kołodziej
Invalid Date
Agenda
Project Overview : What we’re building
Solution & Architecture : System design and technology stack
Data Sources : Binance WebSocket and NBP API
Data Ingestion : NiFi flows for crypto and FX data
Data Storage : HDFS three-layer architecture
Data Processing : ETL transformations
Analytics : Returns, correlations, and aggregations
Serving Layer : Hive SQL and HBase fast access
Results : End-to-end pipeline verification
This presentation covers the complete Big Data pipeline from data ingestion through storage, processing, analytics, and serving layers, with concrete evidence of successful execution.
Project Overview
Project Goals:
Build an end-to-end Big Data pipeline using Apache ecosystem
Handle real-time data streams (Binance WebSocket) and batch data (NBP API)
Implement three-layer data architecture (Raw → Curated → Aggregated)
Process and analyze financial data (cryptocurrency and FX rates)
Provide multiple query interfaces (SQL via Hive, fast reads via HBase)
Technical Requirements:
Ingestion : Real-time WebSocket streams and scheduled API polling
Storage : Distributed file system (HDFS) with partitioning strategy
Processing : ETL transformations using Apache Spark
Analytics : Compute returns, correlations, and aggregations
Serving : SQL interface (Hive) and fast random access (HBase)
Verification : Automated end-to-end testing pipeline
This project demonstrates a complete Big Data pipeline implementation for a university course, showcasing real-world data processing patterns and technologies.
The Solution
Big Data Pipeline Architecture:
graph LR
Binance["Binance<br/>WebSocket"] --> NiFi["Apache<br/>NiFi"]
NBP["NBP<br/>API"] --> NiFi
NiFi --> Kafka["Kafka<br/>(Optional)"]
Kafka --> HDFS["HDFS<br/>Storage"]
NiFi --> HDFS
HDFS --> Spark["Apache<br/>Spark"]
Spark --> Hive["Apache<br/>Hive"]
Spark --> HBase["Apache<br/>HBase"]
Key Capabilities: - Unified ingestion for real-time and batch data - Distributed storage with partitioning - Scalable processing and analytics - Multiple query interfaces
Our solution is a unified Big Data pipeline that handles both real-time streams and batch processing through a well-architected system using Apache technologies.
Technology Stack
Core Components:
Apache NiFi 1.27.0 - Data orchestration
Apache Kafka 7.5.0 - Message buffering
Apache HDFS - Distributed file storage
Apache Spark 3.5.1 - Data processing
Apache Hive 2.3.2 - SQL interface
Apache HBase latest - Fast serving layer
We chose the Apache ecosystem for its maturity, scalability, and integration capabilities. Each component serves a specific purpose in our pipeline.
Data Sources: Binance WebSocket
Binance WebSocket: - Endpoint: wss://stream.binance.com:9443/stream - Stream: btcusdt@aggTrade - Frequency: Real-time - Format: JSON
Sample Data:
{
"e" : "aggTrade" ,
"E" : 1672515782136 ,
"s" : "BTCUSDT" ,
"a" : 12345 ,
"p" : "16800.00" ,
"q" : "0.001" ,
"f" : 100 ,
"l" : 105 ,
"T" : 1672515782000 ,
"m" : false
}
Binance WebSocket provides real-time cryptocurrency trade data through a persistent WebSocket connection, delivering aggregated trade events as they occur.
Data Sources: NBP API
NBP API: - Endpoint: http://api.nbp.pl/api/exchangerates/tables/A/ - Frequency: Daily (business days) - Format: JSON array
Sample Data:
{
"table" : "A" ,
"no" : "001/A/NBP/2026" ,
"effectiveDate" : "2026-01-02" ,
"rates" : [
{
"currency" : "dolar amerykański" ,
"code" : "USD" ,
"mid" : 3.6868
} ,
{
"currency" : "euro" ,
"code" : "EUR" ,
"mid" : 4.0123
}
]
}
NBP API provides daily exchange rates for major currencies, published once per business day by the Polish National Bank.
Data Ingestion: NiFi - Crypto Flow
Processor Details:
1
WebSocket
Establishes persistent connection to Binance WebSocket API, subscribes to btcusdt@aggTrade stream for real-time trade events
2
ValidateRecord
Validates JSON schema: checks required fields (e, E, s, a, p, q, T), ensures data types are correct
3
UpdateRecord
Enriches data: extracts symbol from s, price from p, quantity from q, event_time from T, normalizes field names for downstream processing
4
UpdateAttribute
Adds partitioning metadata: extracts date, hour, minute from timestamp, sets HDFS path attributes for efficient storage organization
5
PublishKafkaRecord
Publishes validated records to Kafka topic crypto-trades for buffering and resilience
6
ConsumeKafkaRecord
Consumes records from Kafka topic, ensures reliable delivery even if downstream processors are temporarily unavailable
7
MergeRecord
Aggregates records by minute window, combines multiple trade events into single CSV records per minute
8
PutHDFS
Writes merged CSV files to HDFS raw layer with date/hour partitioning
Flow File : crypto-flow-kafka-4.json (stored in nifi/backups/)
Data Ingestion: NiFi - NBP Flow
Processor Details:
1
InvokeHTTP
Fetches daily exchange rates from NBP API endpoint http://api.nbp.pl/api/exchangerates/tables/A/, runs on schedule (daily, business days)
2
Extract Metadata
Extracts table metadata: table number, no (publication number), effectiveDate from the API response
3
SplitJson
Splits the rates array into individual JSON records, one per currency pair
4
Extract Currency Fields
Extracts specific fields: currency (full name), code (ISO code like USD, EUR), mid (exchange rate value)
5
UpdateAttribute
Sets HDFS attributes: extracts date from effectiveDate, constructs filename as nbp_rate_{CODE}_{DATE}.json
6
PutHDFS
Writes individual currency records to HDFS raw layer with date partitioning
Flow File : NBP.json (stored in nifi/backups/)
NiFi orchestrates the ingestion process. It handles the complexity of connecting to different sources, validating data, and writing to HDFS with proper partitioning.
Data Storage: HDFS
Three-Layer Architecture:
Raw Layer CSV/JSON - Partitioned by date/hour
Curated Layer Parquet - Hourly aggregates (crypto), Daily rates (FX)
Aggregated Layer Parquet - Daily returns, correlations, Monthly facts
HDFS Directory Structure:
graph TD
Root["/data/finance/"] --> Raw["raw/"]
Root --> Curated["curated/"]
Root --> Agg["aggregated/"]
Raw --> CryptoRaw["crypto-trades/<br/>date=YYYY-MM-DD/<br/>hour=HH/"]
Raw --> NbpRaw["nbp/<br/>date=YYYY-MM-DD/"]
Curated --> CryptoCur["crypto/<br/>trades_agg_hourly/"]
Curated --> NbpCur["nbp/<br/>fx_daily/"]
Agg --> Daily["daily/"]
Agg --> Monthly["monthly/"]
Once data is ingested, it’s stored in HDFS using a three-layer architecture. This separation allows us to optimize each layer for its specific purpose.
Data Processing: ETL
Crypto ETL (etl_crypto.py):
Read CSV from raw layer
Cast and validate data types
Group by symbol, date, hour
Compute aggregates: price_open, price_close, price_high, price_low, price_avg, price_p95, volume_base, volume_quote, trade_count
Write Parquet to curated layer
Partitioned by date and hour
NBP ETL (etl_nbp.py): 1. Read JSON from raw layer 2. Extract date from filename (YYYYMMDD format) 3. Cast mid (rate) to double 4. Handle duplicates (keep latest by load_ts) 5. Write Parquet to curated layer 6. Partitioned by year and month
Output : Standardized, validated, partitioned Parquet files
The ETL layer transforms raw data into a curated format. We aggregate high-frequency crypto trades into hourly summaries and standardize FX rates into daily records.
Analytics: Spark
FX Analytics (analytics_fx_crypto.py):
Read from curated fx_daily table (Parquet format)
Use window functions partitioned by currency code, ordered by date
Compute lagged value: mid_lag = LAG(mid) OVER (PARTITION BY code ORDER BY fx_date)
Calculate daily returns: ret_1d = (mid / mid_lag) - 1.0
Write output to aggregated layer partitioned by year/month
Analytics: Spark
Crypto Analytics:
Extract daily closes from hourly aggregates (select max price_close per symbol/date)
Use window functions partitioned by symbol, ordered by date
Calculate daily returns: ret_1d = (close / close_lag) - 1.0
Write daily closes and returns to aggregated layer
Compute monthly facts: average returns, volatility (stddev), trading days, last close
Write monthly aggregations to aggregated layer
Analytics: Spark
Correlation Analysis:
Join BTCUSDT daily returns with USD/PLN returns on date
Compute 63-day rolling Pearson correlation using window functions
Uses a 63-day rolling window; with insufficient history the correlation is null
Write output to aggregated layer
The analytics layer computes business insights: daily returns, volatility metrics, and cross-asset correlations. These computations use Spark’s window functions for efficient processing.
Serving Layer: Hive
Apache Hive Overview:
SQL-on-Hadoop engine providing SQL interface over distributed data
Uses external tables pointing to Parquet files in HDFS
Automatically discovers partitions using MSCK REPAIR TABLE
Enables ad-hoc queries without writing Spark code
External Tables:
trades_agg_hourly - Hourly crypto aggregates (partitioned by date, hour)
fx_daily - Daily FX rates (partitioned by year, month)
fx_daily_returns - FX daily returns (partitioned by year, month)
crypto_daily - Crypto daily closes and returns (partitioned by symbol)
corr_btc_usdpln_63d - BTC/USDPLN 63-day rolling correlation
crypto_monthly_facts - Monthly crypto statistics (partitioned by symbol)
Serving Layer: Hive
Key Features:
SQL Interface : Standard SQL queries over Parquet data in HDFS
Partition Pruning : Automatically filters partitions based on WHERE clauses
Schema-on-Read : Tables defined by CREATE EXTERNAL TABLE statements
No Data Movement : Queries execute directly on HDFS data
Example Query:
SELECT fx_date, code, mid, ret_1d
FROM finance.fx_daily_returns
WHERE year = 2026 AND month = 1
ORDER BY fx_date DESC , code
LIMIT 10 ;
Serving Layer: HBase
Facts Table: finance:facts_daily
Row key design : Salt + Symbol + Date (e.g., 7|BTCUSDT|20260105)
Column families : metrics, price, volume, correlation
Stored metrics : metrics:ret_1d, price:avg, volume:sum, volume:count, correlation:btc_usdpln_63d
Loading process : load_hbase_facts.py reads from Spark analytics outputs and generates HBase shell put commands
Execution : hbase/load-facts.sh executes the generated commands to populate the table
Features:
Fast random reads : Millisecond latency for point queries by row key
Use case : Real-time lookups, API serving, point queries
Optimized for : Single-row retrievals rather than full table scans
Results
End-to-End Pipeline Verification (Test Date: 2026-01-12)
Test Script: test-full-pipeline.sh
Automated verification of complete pipeline: 1. Spark ETL → Transforms raw CSV/JSON to curated Parquet 2. Spark Analytics → Computes returns, correlations, monthly facts 3. Analytics Sanity Checks → Validates data quality and consistency 4. Hive Tables → Registers external tables for SQL queries 5. Hive Views → Creates analytics views for aggregated data 6. HBase Facts → Loads selected metrics for fast random access
Execution Results:
Data Ingestion:
CSV files written to HDFS raw layer
JSON files written to HDFS raw layer
NiFi processors: All executed successfully
ETL Output:
Crypto: CSV → Parquet hourly aggregates (curated layer)
NBP: JSON → Parquet daily rates (curated layer)
Results
Analytics Verified:
FX daily returns : 27 currency pairs computed
Crypto daily : 1 symbol (BTCUSDT) aggregated
Correlation : Requires 63 days minimum (insufficient data)
Hive Tables Registered:
trades_agg_hourly - Hourly crypto aggregates
fx_daily - Daily FX rates
fx_daily_returns - FX daily returns
crypto_daily - Crypto daily closes and returns
HBase Facts:
Table finance:facts_daily loaded and queryable
Status : All 6 test stages completed successfully